﻿"""WSSE UserToken HTTP Header generation

`UserToken authentication`_ is a standard from the OASIS Web Services Security
technical committee.  It was devised to provide secure authentication for SOAP,
but was applied to early versions of AtomPub by putting the time stamp, nonce,
and message digest values in an X-WSSE HTTP header.

To generate the headers, a secret (e.g., a password), nonce (one-time pad), and
time stamp - a string in `a certain ISO-8601 format`_ - are concatenated and
the message digest of the resulting string becomes the digest value of the
X-WSSE HTTP header.  The user name and time stamp are passed in UTF-8.  The
digest and nonce are passed base64-encoded.

Note that the `Mark Pilgrim article`_ on www.xml.com does not show the nonce
base64-encoded, although it is in this and other implementations.  Trivia:
Pilgrim is `not a big fan`_ of WSSE UserToken.

The HTTP headers are generated by WSSEUserTokenHeaderGenerator objects.

.. _`UserToken authentication`:
   http://www.oasis-open.org/committees/download.php/16782/wss-v1.1-spec-os-UsernameTokenProfile.pdf
.. _`a certain ISO-8601 format`: http://www.w3.org/TR/NOTE-datetime
.. _`Mark Pilgrim article`: http://www.xml.com/pub/a/2003/12/17/dive.html
.. _`not a big fan`: http://www.imc.org/atom-syntax/mail-archive/msg06401.html
"""
# Copyright (c) 2009 Teleport Corp Pty Ltd.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

# Author: Troy Farrell <troy@entheossoft.com>
# Created: 20090806

import base64
import datetime
import os

try:
    import hashlib
except ImportError:
    import sha

# _UTCTzinfo is used in WSSEUserTokenHeaderGenerator._generate_timestamp to put
# time zone information on a datetime object.

class _UTCTzinfo(datetime.tzinfo):
    "Timezone information for UTC"

    __slots__ = []

    def utcoffset(self, datetime_):
        "Offset from UTC in minutes east"
        return datetime.timedelta(0)

    def dst(self, datetime_):
        "Time change for daylight saving time"
        return datetime.timedelta(0)

    def tzname(self, datetime_):
        "Time zone name"
        return "UTC"

class WSSEUserTokenHeaderGenerator(object):
    """Generator of WSSE User Token authentication HTTP headers.

    This generator takes a user name and secret and will generate the nonce and
    time stamp necessary to create UserToken authentication headers.  Headers
    are returned as a Python dictionary, suitable for passing to a Request
    object.
    
    As a generator, calls to next() will generate a new nonce and time stamp
    and return a new set of headers.  Before any call to next, set_nonce and
    set_timestamp may be called to set those parameters for the next set of
    headers headers.  (Calling set_nonce or set_timestamp will not change the
    current headers.  You must call next after calling set_nonce or
    set_timestamp to get headers will the specified nonce and/or timestamp.)
    Calls to get_headers will return the most recently generated headers.

    Usage:

    >>> import wsseut
    >>> ut = wsseut.WSSEUserTokenHeaderGenerator("User name", "Password")
    >>> headers = ut.get_headers()
    >>> assert 'WSSE profile="UsernameToken"' == headers["Authorization"]
    >>> assert 'UsernameToken Username="User name"' in headers["X-WSSE"]
    >>> assert headers == ut.get_headers() # get_headers gets current headers.
    >>> assert headers != ut.next()        # next makes new headers.
    >>> assert ut.next() != ut.next()      # Yes, next makes new headers.
    """

    _tzinfo = _UTCTzinfo()

    # Initialization

    def __init__(self, username, secret, nonce=None, timestamp=None):
        """Initialize the WSSE User Token.
        
        timestamp - a string in iso8601 format
        nonce - random characters in hexadecimal string format
        """
        assert username is not None
        assert secret is not None

        self._digest = None
        self._encodednonce = None
        self._headers = None
        self._nonce = None
        self._secret = secret
        self._timestamp = None
        self._username = username

        self.set_nonce(nonce)
        self.set_timestamp(timestamp)

        self._clear_headers()

    # Iteration

    def __iter__(self):
        "Return this object as an iterator."
        return self

    def next(self):
        "Generate a new WSSE User Token."
        self._clear_headers()
        self._generate_everything()

        headers = self.get_headers()

        return headers

    # Public methods

    def get_headers(self):
        "Get a dictionary of headers for authentication."
        if self._headers is None:
            # In the event someone calls get_headers instead of next()
            # immediately following construction, this will get everything
            # ready.
            if (self._digest       is None or
                self._encodednonce is None or
                self._timestamp    is None):
                self._generate_everything()
            self._headers = {}
            self._headers["Authorization"] = 'WSSE profile="UsernameToken"'
            self._headers["X-WSSE"] = (
                    'UsernameToken '
                    'Username="%(_username)s", '
                    'PasswordDigest="%(_digest)s", '
                    'Nonce="%(_encodednonce)s", '
                    'Created="%(_timestamp)s"' % vars(self))
            self._clear_nonce_timestamp()
        return self._headers
        # Yes, X-WSSE is all one header.

    def set_nonce(self, nonce):
        "Set the next nonce."
        self._nonce = nonce
        self._encodednonce = (nonce is not None and
                              self._b64encode(nonce) or
                              None)

    def set_timestamp(self, timestamp):
        "Set the next timestamp."
        if isinstance(timestamp, datetime.datetime):
            self._generate_timestamp(timestamp)
        else:
            self._timestamp = timestamp

    # Private methods

    @staticmethod
    def _b64encode(string):
        "Base64 encode a string and strip out the newlines."
        b64string = base64.b64encode(string)
        # Strip out the newlines which mess up the HTTP header.
        result = "".join(b64string.split())
        return result

    def _clear_headers(self):
        "Clear any pre-existing headers."
        self._headers = None
        self._digest = None

    def _clear_nonce_timestamp(self):
        "Clear the nonce and timestamp."
        self.set_nonce(None)
        self.set_timestamp(None)

    def _generate_digest(self):
        "Generate the UserToken digest."
        if "hashlib" in globals():
            mdigest = hashlib.sha1()
        else:
            mdigest = sha.new()
        mdigest.update(self._nonce)
        mdigest.update(self._timestamp)
        mdigest.update(self._secret)
        digest = mdigest.digest()
        b64digest = self._b64encode(digest)
        self._digest = b64digest

    def _generate_everything(self):
        "Prepare all the pieces to make new headers."
        # The nonce
        if self._nonce is None:
            self._generate_nonce()

        # The time stamp
        if self._timestamp is None:
            self._generate_timestamp()

        # The digest
        self._generate_digest()

    def _generate_nonce(self):
        "Get a random number."
        noncelength = 32
        try:
            bytes = [byte for byte in os.urandom(noncelength)]
        except NotImplementedError:
            import random
            bytes = [chr(random.randint(0, 256)) for byte in noncelength]
        self.set_nonce("".join(bytes))

    def _generate_timestamp(self, timestamp=None):
        "Get a timestamp."
        if timestamp is None:
            timestamp = datetime.datetime.now(self._tzinfo)
        self.set_timestamp(timestamp.replace(microsecond=0).isoformat())

# vim:set bomb et sw=4 ts=4 tw=79:
